package com.huan.kafka.api;

import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;

/**
 * @author huan.fu 2021/1/5 - 上午9:57
 */
public class CustomPartitioner extends DefaultPartitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        int partition = super.partition(topic, key, keyBytes, value, valueBytes, cluster);
        System.err.println("消息被分配到分区: " + partition);
        return partition;
    }

    @Override
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        System.err.println("topic: " + topic + " 产生了一个新的批次");
        super.onNewBatch(topic, cluster, prevPartition);
    }
}
